<?php

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Exchange\AMQPExchangeType;
use PhpAmqpLib\Message\AMQPMessage;

main();
function main(){
    //连接broker,创建一个rabbitmq连接
    $connection = new AMQPStreamConnection('127.0.0.1', 5672, 'guest', 'guest', '/');

    if(!$connection){
        echo "连接失败";
        exit();
    }

    /*声明一个通道*/
    $channel = $connection->channel();

    /*声明一个队列*/
    $channel->queue_declare('order_satistic_queue', false, true, false, false);

    /*每次消费限流1000*/
    $channel->basic_qos(null, 1000, null);

    /*开启消费监听*/
    $channel->basic_consume('order_satistic_queue', '', false, false, false, false, function ($message)
    {
        echo "\n--------\n";
        echo $message->body;
        echo "\n--------\n";

        /*从rabbitmq里面取出json数据*/
        $data_json = $message->body;

        /*把json数据编译后，放入$data变量，
        $data变量是个二维数组，代表多条mysql数据*/
        $data = json_decode($data_json, true);


        /*数据保存到mysql数据库*/
        saveToMysql($data, $message);
    });


    while ($channel->is_consuming()) {
        // 测试时10秒后让进程死掉，正式环境这里第三个参数要填0而不是10
        $channel->wait(null, false, 0);
    }
}

/*数据保存到统计库*/
function saveToMysql($data, $message){


    try {

        $db = new db("39.99.165.81","root","cdS1234567","atest");

        $is_has = $db->getOne(['table'=>'order_log','where_str'=>'AND order_no=450']) ;
        if($is_has){
            /*这里应该还有其它逻辑的*/
            $message->ack(true);// 这一行代码，底层是给rabbitmq推送一条消息，告诉他我消费成功，你可以删除消息了
            return;
        }

        /*开启事务*/
        $db->begin();

        $return1 = $db->exec("修改统计库的sql") ;
        $return2 = $db->exec("INSERT INTO order_log values (订单号)") ;

        echo "执行结果:"; print_r($return1); print_r($return2);

        /*提交事务*/
        if(!$db->commit()) {
            throw new Exception("执行sql操作失败！");
        }

        /*成功ack*/
        $message->ack(true);// 这一行代码，底层是给rabbitmq推送一条消息，告诉他我消费成功，你可以删除消息了


    } catch (Exception $e) {
        //print $e->getMessage();
        $db->rollBack();

        /*注意，这里nack要加一个次数限制，代码略，防止无限循环
        如果多次都失败，可以把它存到日志里面，再手动处理
        */

        /*失败nack*/
        $message->nack(true);// 这一行代码，底层是给rabbitmq推送一条消息，告诉他我消费失败，你不要删除消息
    }
}
